#!/usr/bin/env python
# -*- coding:utf-8 -*-


'''
@CreateTime: 2020/10/27 14:36
@Author : laifuyu
'''

import logging
import random
import copy
import warnings
import gevent
# from time import time
import time

from .rpc import Message
from .exception import RPCError
from .runners import MasterRunner, WorkerRunner, WorkerNode
from .runners import STATE_INIT, STATE_SPAWNING, STATE_RUNNING, STATE_CLEANUP, STATE_STOPPING, STATE_STOPPED, STATE_MISSING
from .runners import HEARTBEAT_LIVENESS, FALLBACK_INTERVAL, HEARTBEAT_INTERVAL
from .runners import greenlet_exception_handler
from .stats import stats_printer, print_stats, chart_stats_collecter, collect_chart_stats

logger = logging.getLogger(__name__)

class LocustMasterRunner(MasterRunner):
    """
    Runner used to run distributed load tests across multiple processes and/or machines.

    LocustMasterRunner doesn't spawn any user greenlets itself. Instead it expects
    :class:`LocustWorkerRunners <LocustWorkerRunner>` to connect to it, which it will then direct
    to start and stop user greenlets. Stats sent back from the
    :class:`LocustWorkerRunners <LocustWorkerRunner>` will aggregated.
    """

    def __init__(self, environment, master_bind_host, master_bind_port):
        """
        :param environment: Environment instance
        :param master_bind_host: Host/interface to use for incoming worker connections
        :param master_bind_port: Port to use for incoming worker connections
        """
        super().__init__(environment, master_bind_host, master_bind_port)
        self.greenlet.spawn(self.check_stopped_periodicity).link_exception(greenlet_exception_handler) # 新增协程，定期检测客户端是否停止
        self.target_scenario = None # 目标场景
        self.hold_load_time = None # 维持负载时间
        self.stop_rate = None # 每秒停止用户数
        self.iteration_num = None # 迭代运行次数
        self.scenario_chain_weight_list = [] # [链路1权重1, 链路2权重2, 链路2权重3, ...]
        self.scenario_chain_id_list = [] # [链路1 ID1, 链路2 ID2, 链路3 ID3, ...] 和 scenario_chain_weight_list 元素一对一对应
        self.task_min_wait = 0  # 每个用户每运行完一个task，开始执行下一个task的最小等待时间
        self.task_max_wait = 0  # 每个用户每运行完一个task，开始执行下一个task的最大等待时间
        self.task_wait_time = 0  # 每个用户每运行完一个task，开始执行下一个task的固定等待时间
        self.iteration_mode = 2 #迭代模式 1-链路串行(每次迭代执行所有链路) 1-链路权重(每次迭代仅执行1条链路[按链路权重选取链路]) 2-链路随机(每次迭代仅执行一条链路[按链路权重随机选取])
        self.chain_weight_percent_map = {} # {链路ID1: 链路权重百分比，链路ID2: 链路权重百分比，...}
        self.greenlet_start_user_gradully = None # 存放梯度加压微线程
        self.greenlet_print_stats = None # 存放打印报告协程
        self.greenlet_for_chart_stats = None # 存放收集echart图表统计数据协程
        self.slave_target_scenario_dict = {} # 存放压测结点的压测目标场景

    def get_slave_scenario_config(self, scenario_config):
        '''
        获取salve场景配置，主要是解决参数文件分配问题，每个压测结点获取一个参数文件，如果文件数不足以均分给所有文件数，则超出结点以共享的方式，读取最后一个参数文件
        '''

        scenario_config_copy = copy.deepcopy(scenario_config)

        def parse_json_data(target_scenario_config, src_scenario_config):
            if type(target_scenario_config) in [type(()), type([])]:
                for index, target_item in enumerate(target_scenario_config):
                    if type(target_item) == type({}):
                        if 'action' in target_item and target_item['action'].lower().strip() == 'read_csv_file_data':
                            file_names = target_item.get('fileName')
                            if type(file_names) == type([]):
                                file_max_index = len(file_names) - 1
                                if file_max_index > -1:
                                    target_item['fileName'] = file_names[0]
                                    if file_max_index > 0: # 非最后一个文件
                                        src_scenario_config[index]['fileName'].pop(0)
                                    else: # 最后一个文件
                                        src_scenario_config[index]['fileName'] = file_names[0]
                                else:
                                    target_item['fileName'] = ''
                                    src_scenario_config[index]['fileName'] = ''
                        else:
                            parse_json_data(target_item, src_scenario_config[index])
                    elif type(target_item) in [type(()), type([])]:
                        parse_json_data(target_item, src_scenario_config[index])
            elif type(target_scenario_config) == type({}):
                if 'action' in target_scenario_config and target_scenario_config['action'].lower().strip() == 'read_csv_file_data':
                    file_names = target_scenario_config.get('fileName')
                    if type(file_names) == type([]):
                        file_max_index = len(file_names) - 1
                        if file_max_index > -1:
                            target_scenario_config['fileName'] = file_names[0]
                            if file_max_index > 0: # 非最后一个文件
                                src_scenario_config['fileName'].pop(0)
                            else:
                                src_scenario_config['fileName'] = file_names[0]
                        else:
                            target_scenario_config['fileName'] = ''
                            src_scenario_config['fileName'] = ''
                else:
                    for target_key in target_scenario_config:
                        if type(target_scenario_config[target_key]) in [type({}), type(()), type([])]:
                            parse_json_data(target_scenario_config[target_key], src_scenario_config[target_key])

        parse_json_data(scenario_config_copy, scenario_config)
        return scenario_config_copy

    def start(self, user_count, spawn_rate, hold_load_time=None):
        self.target_user_count = user_count
        num_workers = len(self.clients.ready) + len(self.clients.running) + len(self.clients.spawning)
        if not num_workers:
            logger.warning(
                "You are running in distributed mode but have no worker servers connected. "
                "Please connect workers prior to swarming."
            )
            return

        self.spawn_rate = spawn_rate
        worker_num_users = user_count // (num_workers or 1)
        worker_spawn_rate = float(spawn_rate) / (num_workers or 1)
        remaining = user_count % num_workers


        logger.info(
            "Sending spawn jobs of %d users and %.2f spawn rate to %d ready clients"
            % (worker_num_users, worker_spawn_rate, num_workers)
        )

        if worker_spawn_rate > 100:
            logger.warning(
                "Your selected spawn rate is very high (>100/worker), and this is known to sometimes cause issues. Do you really need to ramp up that fast?"
            )

        if self.state != STATE_RUNNING and self.state != STATE_SPAWNING:
            self.stats.clear_all()
            self.exceptions = {}
            self.environment.events.test_start.fire(environment=self.environment)
            self.spawn_stats_printer()
            self.spawn_chart_stats_collector()

        chain_weight_percent_list = sorted(self.chain_weight_percent_map.items(), key=lambda arg:arg[0], reverse=True)
        for client in self.clients.ready + self.clients.running + self.clients.spawning:
            if client.id not in self.slave_target_scenario_dict:
                target_scenario = self.get_slave_scenario_config(self.target_scenario)
                self.slave_target_scenario_dict[client.id] = target_scenario
            else:
                target_scenario = self.slave_target_scenario_dict.get(client.id)
            data = {
                "spawn_rate": worker_spawn_rate,
                "worker_num_users": worker_num_users,
                "stop_timeout": self.environment.stop_timeout,
                "stop_rate":self.stop_rate,
                "hold_load_time": hold_load_time,
                "iteration_num":self.iteration_num,
                "host": self.environment.host,
                "scenario_chain_weight_list": self.scenario_chain_weight_list,
                "scenario_chain_id_list": self.scenario_chain_id_list,
                #"target_scenario": self.target_scenario,
                "target_scenario":target_scenario,
                "task_min_wait":self.task_min_wait,
                "task_max_wait":self.task_max_wait,
                "task_wait_time":self.task_wait_time,
                "iteration_mode":self.iteration_mode,
                "chain_weight_percent_list":chain_weight_percent_list
            }

            if remaining > 0:
                data["worker_num_users"] += 1
                remaining -= 1

            if not data["worker_num_users"]:
                continue

            logger.debug("Sending spawn message to client %s" % (client.id))
            self.server.send_to_client(Message("spawn", data, client.id))
        self.update_state(STATE_SPAWNING)


    def stop(self, force=False):
        if self.state not in [STATE_INIT, STATE_STOPPED, STATE_STOPPING] or force:
            logger.debug("Stopping...")
            self.stop_greenlet_start_user_gradully() #停止梯度加压协程
            self.release_resource() # 释放资源

            self.update_state(STATE_STOPPING)

            if self.environment.shape_class:
                self.shape_last_state = None

            for client in self.clients.all:
                logger.debug("Sending stop message to client %s" % (client.id))
                self.server.send_to_client(Message("stop", None, client.id))

            self.environment.events.test_stop.fire(environment=self.environment)

    # 新增链路权重占百分比值函数
    def compute_chain_weight_percent(self, target_scenario):
        '''链路权重百分比值计算:单条链路权重占所有链路权重合百分比'''

        try:
            chain_total_weight = 0 # 链路总权重
            chain_total_num = 0 # 链路总数
            for scenario_chain in target_scenario:
                temp_weiht = scenario_chain.get('weight')
                if not temp_weiht:
                    temp_weiht = 0
                chain_total_weight += temp_weiht
                self.chain_weight_percent_map[scenario_chain['id']] = temp_weiht
                chain_total_num += 1

            temp_count= 1
            last_chain_weight_percent = 1 # 存放最后一条条链路的权重比
            if chain_total_weight:
                for chain_id, weight in self.chain_weight_percent_map.items():
                    if temp_count < chain_total_num:
                        temp_percent = weight/chain_total_weight
                        self.chain_weight_percent_map[chain_id] = temp_percent
                        last_chain_weight_percent -= temp_percent
                    else:# 如果是最后一项，取剩余权重值
                        self.chain_weight_percent_map[chain_id] = last_chain_weight_percent
                    temp_count += 1
            else: # 平均分配权重比
                temp_percent = 1 / chain_total_num
                for chain_id, weight in self.chain_weight_percent_map.items():
                    if temp_count < chain_total_num:
                        self.chain_weight_percent_map[chain_id] = temp_percent
                        last_chain_weight_percent -= temp_percent
                    else:
                        self.chain_weight_percent_map[chain_id] = last_chain_weight_percent
                    temp_count += 1
        except Exception as e:
            raise e

    def check_stopped_periodicity(self):
        while True:
            gevent.sleep(HEARTBEAT_INTERVAL*10)
            self.check_stopped()

    def check_stopped(self):
        if (
            not self.state == STATE_INIT
            and not self.state == STATE_STOPPED
            and all(map(lambda x: x.state != STATE_RUNNING and x.state != STATE_SPAWNING and x.state != STATE_CLEANUP, self.clients.all)) # added by shouke
        ):
            print_stats(self.environment.stats)
            if self.greenlet_print_stats:
                self.greenlet_print_stats.kill(block=True)
                self.greenlet_print_stats = None

            try:
                collect_chart_stats(self)
            except Exception as e:
                logger.error('collect chart statistics failure：%s' % e)

            if self.greenlet_for_chart_stats:
                self.greenlet_for_chart_stats.kill(block=True)
                self.greenlet_for_chart_stats = None
            self.update_state(STATE_STOPPED)


    def spawn_stats_printer(self):
        if not self.greenlet_print_stats:
            self.greenlet_print_stats = self.greenlet.spawn(stats_printer(self.environment.stats))
            self.greenlet_print_stats.link_exception(greenlet_exception_handler)


    def spawn_chart_stats_collector(self):
        if not self.greenlet_for_chart_stats:
            self.greenlet_for_chart_stats = self.greenlet.spawn(chart_stats_collecter(self))
            self.greenlet_for_chart_stats.link_exception(greenlet_exception_handler)


    def client_listener(self):
        while True:
            try:
                client_id, msg = self.server.recv_from_client()
            except RPCError as e:
                logger.error("RPCError found when receiving from client: %s" % (e))
                self.connection_broken = True
                gevent.sleep(FALLBACK_INTERVAL)
                continue
            self.connection_broken = False
            msg.node_id = client_id
            if msg.type == "client_ready":
                id = msg.node_id
                self.clients[id] = WorkerNode(id, heartbeat_liveness=HEARTBEAT_LIVENESS)
                logger.info(
                    "Client %r reported as ready. Currently %i clients ready to swarm."
                    % (id, len(self.clients.ready + self.clients.running + self.clients.spawning))
                )
                # if self.state == STATE_RUNNING or self.state == STATE_SPAWNING:
                #     # balance the load distribution when new client joins
                #     self.start(self.target_user_count, self.spawn_rate)
                # emit a warning if the worker's clock seem to be out of sync with our clock
                # if abs(time() - msg.data["time"]) > 5.0:
                #    warnings.warn("The worker node's clock seem to be out of sync. For the statistics to be correct the different locust servers need to have synchronized clocks.")
            elif msg.type == "client_stopped":
                del self.clients[msg.node_id]
                logger.info("Removing %s client from running clients" % (msg.node_id))
                self.check_stopped()
            elif msg.type == "heartbeat":
                if msg.node_id in self.clients:
                    c = self.clients[msg.node_id]
                    c.heartbeat = HEARTBEAT_LIVENESS
                    c.state = msg.data["state"]
                    c.cpu_usage = msg.data["current_cpu_usage"]
                    if not c.cpu_warning_emitted and c.cpu_usage > 90:
                        self.worker_cpu_warning_emitted = True  # used to fail the test in the end
                        c.cpu_warning_emitted = True  # used to suppress logging for this node
                        logger.warning(
                            "Worker %s exceeded cpu threshold (will only log this once per worker)" % (msg.node_id)
                        )
                else:
                    if self.state in [STATE_SPAWNING, STATE_RUNNING]: # 在压测过程中不接受新客户端
                        continue
                    id = msg.node_id
                    self.clients[id] = WorkerNode(id, heartbeat_liveness=HEARTBEAT_LIVENESS)
            elif msg.type == "stats":
                self.environment.events.worker_report.fire(client_id=msg.node_id, data=msg.data)
            elif msg.type == "spawning":
                self.clients[msg.node_id].state = STATE_SPAWNING
            elif msg.type == "spawning_complete":
                self.clients[msg.node_id].state = STATE_RUNNING
                self.clients[msg.node_id].user_count = msg.data["count"]
                if len(self.clients.spawning) == 0:
                    count = sum(c.user_count for c in self.clients.values())
                    self.environment.events.spawning_complete.fire(user_count=count)
            elif msg.type == "quit":
                if msg.node_id in self.clients:
                    del self.clients[msg.node_id]
                    logger.info(
                        "Client %r quit. Currently %i clients connected." % (msg.node_id, len(self.clients.ready))
                    )
                    if self.worker_count - len(self.clients.missing) <= 0:
                        logger.info("The last worker quit, stopping test.")
                        self.stop()
                        if self.environment.parsed_options and self.environment.parsed_options.headless:
                            self.quit()
            elif msg.type == "exception":
                self.log_exception(msg.node_id, msg.data["msg"], msg.data["traceback"])
            elif msg.type == "runLoadTest": # 运行负载测试
                try:
                    if self.state == STATE_RUNNING or self.state == STATE_SPAWNING:
                        self.server.send_to_client(Message("result", {"success":False, "msg": "正在执行压测，请在压测结束后重试"}, msg.node_id))
                        continue

                    target_user_count = msg.data.get('target_user_count') or 1# 待启动用户总数
                    self.spawn_rate = msg.data.get('spawn_rate')  or 5 # 每秒启动用户数
                    user_add_each_interval = msg.data.get('add')  # 每间隔添加的用户数
                    initial_user_num = msg.data.get('initial_user_num') or user_add_each_interval # 启动初始用户数
                    add_user_interval = msg.data.get('interval') or 0 # 添加用户时间间隔，单位为秒
                    self.iteration_num = msg.data.get('iteration_num_each_user')
                    self.hold_load_time = msg.data.get('hold_load_time') if not (type(self.iteration_num) == type(1) and self.iteration_num > 0) else None
                    self.stop_rate = msg.data.get('stop_rate') or 1
                    self.environment.host = msg.data.get('host')
                    if msg.data.get("iteration_min_wait"):
                        self.task_min_wait = msg.data.get("iteration_min_wait")
                    if msg.data.get("iteration_max_wait"):
                        self.task_max_wait = msg.data.get("iteration_max_wait")
                    if msg.data.get("iteration_wait_time"):
                        self.task_wait_time = msg.data.get("iteration_wait_time")
                    if msg.data.get("iteration_mode") and msg.data.get("iteration_mode") in [1, 2, 3]:
                        self.iteration_mode = msg.data.get("iteration_mode")
                    self.target_scenario = {}
                    self.scenario_chain_id_list = []
                    self.scenario_chain_weight_list = []

                    for scenario_chain in msg.data.get('scenario'):
                        self.target_scenario[scenario_chain['id']] = scenario_chain
                        self.scenario_chain_id_list.append(scenario_chain['id'])
                        self.scenario_chain_weight_list.append(scenario_chain['weight'])

                    if not self.target_scenario:
                        self.server.send_to_client(Message("result", {"success":False, "msg":"未找到场景链路，启动压测失败"}, msg.node_id))
                        continue

                    self.compute_chain_weight_percent(msg.data.get('scenario'))

                    if user_add_each_interval and user_add_each_interval < target_user_count:
                        if self.greenlet_start_user_gradully:
                            self.server.send_to_client(Message("result", {"success":False, "msg":"操作失败，正在执行梯度加压"}, msg.node_id))
                            continue

                        self.greenlet_start_user_gradully = self.greenlet.spawn(
                            self.start_user_gradually, initial_user_num, user_add_each_interval, add_user_interval, target_user_count
                        )
                        self.greenlet_start_user_gradully.link_exception(greenlet_exception_handler)
                    else:
                        self.start(target_user_count, self.spawn_rate, self.hold_load_time)
                    self.server.send_to_client(Message("result", {"success":True, "msg":"正在启动压测"}, msg.node_id))
                except Exception as e:
                    self.server.send_to_client(Message("result", {"success":False, "msg": "启动压测出错：%s" % e}, msg.node_id))
            elif msg.type == "stopLoadTest": # 停止压测
                try:
                    self.stop(True)
                    self.server.send_to_client(Message("result", {"success":True, "msg":"正在停止压测"}, msg.node_id))
                except Exception as e:
                    self.server.send_to_client(Message("result", {"success":False, "msg":"停止压测出错：%s" % e}, msg.node_id))
            elif msg.type == "resetSpawnRate": # 调整每秒启动用户数
                try:
                    if self.state != STATE_SPAWNING:
                        self.spawn_rate = msg.data.get('spawn_rate')
                        self.server.send_to_client(Message("result", {"success":True, "msg":"调整每秒启动用户数成功"}, msg.node_id))
                    else:
                        self.server.send_to_client(Message("result", {"success":False, "msg":"正在生成压测用户，请稍候再试"}, msg.node_id))
                except Exception as e:
                    self.server.send_to_client(Message("result", {"success":False, "msg":"调整每秒启动用户数出错：%s" % e}, msg.node_id))
            elif msg.type == "resetStopRate": # 调整每秒停止用户数
                try:
                    if self.state != STATE_STOPPING:
                        self.stop_rate = msg.data.get('stop_rate')
                        self.server.send_to_client(Message("result", {"success":True, "msg":"调整每秒停止用户数成功"}, msg.node_id))
                    else:
                        self.server.send_to_client(Message("result", {"success":False, "msg":"正在停止压测用户，请稍候再试"}, msg.node_id))
                except Exception as e:
                    self.server.send_to_client(Message("result", {"success":False, "msg":"调整每秒启动用户数出错：%s" % e}, msg.node_id))
            elif msg.type == "quitMaster": # 退出程序
                try:
                    self.quit()
                    self.server.send_to_client(Message("quit", {"success":True, "msg":"正在退出Master程序"}, msg.node_id))
                except Exception as e:
                    self.server.send_to_client(Message("quit", {"success":False, "msg":"退出Master程序出错：%s" % e}, msg.node_id))

            # self.check_stopped() # commented by shouke

    def start_user_gradually(self, initial_user_num, user_add_each_interval, add_user_interval, target_user_count):
        '''梯度加压 '''

        try:
            current_total_user = initial_user_num
            while current_total_user < target_user_count:
                start_time = time.time()
                self.start(current_total_user, self.spawn_rate)
                current_total_user += min(user_add_each_interval, target_user_count-current_total_user)
                gevent.sleep(max(0, add_user_interval - time.time() + start_time))
            else:
                self.spawn_target_user_count = True
                self.start(current_total_user, self.spawn_rate, self.hold_load_time)
        except Exception as e:
            logger.error('梯度加压出错：%s' % e)
        finally:
            self.stop_greenlet_start_user_gradully()

    def stop_greenlet_start_user_gradully(self):
        '''停止梯度加压协程'''

        if self.greenlet_start_user_gradully:
            self.greenlet_start_user_gradully.kill(block=True)
            self.greenlet_start_user_gradully = None


    def release_resource(self):
        ''' 释放资源 '''

        self.slave_target_scenario_dict = {}




class LocustWorkerRunner(WorkerRunner):

    """
    Runner used to run distributed load tests across multiple processes and/or machines.

    LocustWorkerRunner connects to a :class:`MasterRunner` from which it'll receive
    instructions to start and stop user greenlets. The WorkerRunner will periodically
    take the stats generated by the running users and send back to the :class:`MasterRunner`.
    """

    def __init__(self, environment, master_host, master_port):
        """
        :param environment: Environment instance
        :param master_host: Host/IP to use for connection to the master
        :param master_port: Port to use for connecting to the master
        """
        super().__init__(environment, master_host, master_port)
        self.scenario_chain_weight_list = []
        self.scenario_chain_id_list = []
        self.target_scenario = None
        self.iteration_num = None
        self.hold_load_time = None # 持续负载时间(不包含开始启动用户时间和停止用户耗时)
        self.task_min_wait = 0  # 每个用户每运行完一个task，开始执行下一个task的最小等待时间
        self.task_max_wait = 0  # 每个用户每运行完一个task，开始执行下一个task的最大等待时间
        self.task_wait_time = 0  # 每个用户每运行完一个task，开始执行下一个task的固定等待时间
        self.iteration_mode = 2 #迭代模式 1-链路串行(每次迭代执行所有链路) 1-链路权重(每次迭代仅执行1条链路[按链路权重选取链路]) 2-链路随机(每次迭代仅执行一条链路[按链路权重随机选取])
        self.chain_weight_percent_list = [] # 存放链路权重百分比值列表
        self.greenlet_stop_runner_later = None # 存放延迟停止runner协程
        self.greenlet_print_stats = None # 存放周期性打印性能数据协程

        self.user_share_resouce_dict = {} # 存放用户共享资源（文件除外）
        self.user_share_resouce_dict['active_user_count'] = 0 # 启动用户数计数器
        self.file_data_dict = {} # 存放打开的文件句柄即数据等信息


    def spawn_users(self, spawn_count, spawn_rate, wait=False):
        bucket = self.weight_users(spawn_count)
        spawn_count = len(bucket)
        if self.state == STATE_INIT or self.state == STATE_STOPPED:
            self.update_state(STATE_SPAWNING)

        existing_count = len(self.user_greenlets)
        logger.info(
            "Spawning %i users at the rate %g users/s (%i users already running)..."
            % (spawn_count, spawn_rate, existing_count)
        )
        occurrence_count = dict([(l.__name__, 0) for l in self.user_classes])

        def spawn():
            sleep_time = 1.0 / spawn_rate
            while True:
                if not bucket:
                    logger.info(
                        "All users spawned: %s (%i already running)"
                        % (
                            ", ".join(["%s: %d" % (name, count) for name, count in occurrence_count.items()]),
                            existing_count,
                        )
                    )
                    # 不管是否梯度加压，如果spawn_count不是最终目标用户，获取的self.hold_load_time都会为None
                    # self.iteration_num不为真，并且设置持续负载时间，才设置运行self.hold_load_time秒后停止执行压测
                    if not self.iteration_num and self.hold_load_time is not None:
                        if self.greenlet_stop_runner_later:
                            self.greenlet_stop_runner_later.kill(block=True)
                            self.greenlet_stop_runner_later = None
                        self.greenlet_stop_runner_later = self.greenlet.spawn(
                                self.stop_runner_later, self.hold_load_time
                            )
                        self.greenlet_stop_runner_later.link_exception(greenlet_exception_handler)
                        self.hold_load_time = None # 重置标记
                    self.environment.events.spawning_complete.fire(user_count=len(self.user_greenlets))
                    return

                user_class = bucket.pop(random.randint(0, len(bucket) - 1))
                occurrence_count[user_class.__name__] += 1
                user_class.min_wait = self.task_min_wait
                user_class.max_wait = self.task_max_wait
                user_class.task_wait_time = self.task_wait_time
                new_user = user_class(self.environment)
                new_user.start(self.user_greenlets)
                if len(self.user_greenlets) % 10 == 0:
                    logger.debug("%i users spawned" % len(self.user_greenlets))
                if bucket:
                    gevent.sleep(sleep_time)

        spawn()
        if wait:
            self.user_greenlets.join()
            logger.info("All users stopped\n")

    def heartbeat(self):
        while True:
            if not len(self.user_greenlets) and self.worker_state == STATE_RUNNING:
                self.stop_runner()
            try:
                self.client.send(
                    Message(
                        "heartbeat",
                        {"state": self.worker_state, "current_cpu_usage": self.current_cpu_usage},
                        self.client_id,
                    )
                )
            except RPCError as e:
                logger.error("RPCError found when sending heartbeat: %s" % (e))
                self.reset_connection()
            gevent.sleep(HEARTBEAT_INTERVAL)

    def worker(self):
        while True:
            try:
                msg = self.client.recv()
            except RPCError as e:
                logger.error("RPCError found when receiving from master: %s" % (e))
                continue
            if msg.type == "spawn":
                if not self.greenlet_print_stats:
                    self.greenlet_print_stats = self.greenlet.spawn(stats_printer(self.environment.stats))
                    self.greenlet_print_stats.link_exception(greenlet_exception_handler)

                self.worker_state = STATE_SPAWNING
                self.client.send(Message("spawning", None, self.client_id))
                job = msg.data
                self.spawn_rate = job["spawn_rate"]
                self.target_user_count = job["worker_num_users"]
                self.environment.host = job["host"]
                self.environment.stop_timeout = job["stop_timeout"]
                self.stop_rate = job["stop_rate"]
                self.hold_load_time = job["hold_load_time"]
                self.iteration_num = job["iteration_num"]
                self.scenario_chain_weight_list = job["scenario_chain_weight_list"]
                self.scenario_chain_id_list = job["scenario_chain_id_list"]
                self.target_scenario = job["target_scenario"]
                self.task_min_wait = job["task_min_wait"]
                self.task_max_wait = job["task_max_wait"]
                self.task_wait_time = job["task_wait_time"]
                self.iteration_mode = job["iteration_mode"]
                self.chain_weight_percent_list = job["chain_weight_percent_list"]

                if self.spawning_greenlet:
                    # kill existing spawning greenlet before we launch new one
                    self.spawning_greenlet.kill(block=True)
                self.spawning_greenlet = self.greenlet.spawn(
                    lambda: self.start(user_count=self.target_user_count, spawn_rate=job["spawn_rate"])
                )
                self.spawning_greenlet.link_exception(greenlet_exception_handler)
            elif msg.type == "stop":
                logger.info("Got stop message from master, stop runner...")
                self.stop_runner()
            elif msg.type == "quit":
                logger.info("Got quit message from master, shutting down...")
                self.stop()
                #self._send_stats()  # send a final report, in case there were any samples not yet reported
                self.greenlet.kill(block=True)

    def stop_runner_later(self, hold_load_time):
        '''推迟停止stop_runner'''

        if not hold_load_time or type(hold_load_time) == type(''):
            hold_load_time = 0
        gevent.sleep(hold_load_time)
        self.stop_runner()


    def stop_runner(self):
        '''停止runner时需要动作执行函数'''

        if self.worker_state in [STATE_INIT, STATE_CLEANUP, STATE_STOPPING, STATE_STOPPED]:
            return
        self.stop()
        self.client.send(Message("client_stopped", None, self.client_id))
        self.client.send(Message("client_ready", None, self.client_id))
        self.worker_state = STATE_INIT

    def stop(self):
        """
        Stop a running load test by stopping all running users
        """
        try:
            logger.debug("Stopping all users")
            self.update_state(STATE_CLEANUP)
            # if we are currently spawning users we need to kill the spawning greenlet first
            if self.spawning_greenlet and not self.spawning_greenlet.ready():
                self.spawning_greenlet.kill(block=True)
            if hasattr(self, 'stop_rate'):
                self.stop_users(self.user_count, self.stop_rate)
            else:
                self.stop_users(self.user_count)

            self._send_stats()  # send a final report, in case there were any samples not yet reported
            self.update_state(STATE_STOPPED)

            self.cpu_log_warning()
        except Exception as e:
            raise
        finally:
            self.release_resource()

    def release_resource(self):
        '''压测结束，释放资源'''

        try:
            logger.info('load test finished，ready to release and clear resources')
            for filename in self.file_data_dict:
                if 'file_handler' in self.file_data_dict[filename]:
                     self.file_data_dict[filename]['file_handler'].close()

            self.user_resource_dict = {} # 存放用户独享资源
            self.user_share_resouce_dict = {} # 存放用户共享资源（文件除外）
            self.user_share_resouce_dict['active_user_count'] = 0 # 处于启动状态用户数计数器
            self.file_data_dict = {} # 存放打开的文件句柄即数据等信息
        except Exception as e:
            raise
        finally:
            if self.greenlet_print_stats:
                self.greenlet_print_stats.kill(block=True)
                self.greenlet_print_stats = None


